# Alexander F. Gazmararian
# agazmararian@gmail.com

library(here)
library(tidyverse)
library(tidylog)
library(lubridate)
library(sf)
library(terra)
library(tigris)
library(parallel)

# Replication mode detection ----
# In anonymized mode, skip distance calculations and load cached results
source(here("R", "visibility", "replication_mode.R"))
REPLICATION_MODE <- init_replication_mode()

if (REPLICATION_MODE == "anonymized") {
  message("=== ANONYMIZED MODE ===")
  message("Loading cached distance calculations (coordinates not available)")
  
  cache_path <- get_distance_cache_path()
  if (!file.exists(cache_path)) {
    stop("Anonymized mode requires cached distance data. File not found: ", cache_path)
  }
  
  distance_master <- readRDS(cache_path)
  message("Loaded ", nrow(distance_master), " cached distance records")
  message("Skipping distance calculations - using cached data")
  
  # Skip the rest of the script - distance_master is already loaded
  # The script will end here in anonymized mode
} else {
  message("=== FULL MODE ===
")
  message("Computing distance calculations from coordinates")
}

# The rest of the script only runs in full mode
if (REPLICATION_MODE == "full") {

# Configuration ----
# Using Albers Equal Area projection for accurate distance calculations across the continental US
crs.proj <- "ESRI:102003"  # North America Albers Equal Area Conic
coord.crs <- 4326
project_year_cutoff <- 2024 # for efficient merging

# Cross-platform parallel processing configuration ----
# mclapply (forking) is faster and more memory-efficient but only works on Unix
# PSOCK clusters work on Windows but use more memory (full copy to each worker)
# Set PARALLEL_WINDOWS_MODE to "psock" for parallel on Windows, or "sequential" to save memory
# Set PARALLEL_WINDOWS_CORES to control number of workers on Windows (default: 8)

PARALLEL_WINDOWS_MODE <- getOption("parallel.windows.mode", default = "psock")
PARALLEL_WINDOWS_CORES <- getOption("parallel.windows.cores", default = 8)

#' Setup parallel processing backend
#' @param n_cores Number of cores to use. On Unix: default detectCores() - 1, max 8. 
#'   On Windows: uses PARALLEL_WINDOWS_CORES setting (default 8).
#' @return List with backend type and apply function to use
setup_parallel_backend <- function(n_cores = NULL) {
  
  if (.Platform$OS.type == "unix") {
    # Unix (macOS, Linux): Use forking via mclapply - fast and memory efficient
    if (is.null(n_cores)) {
      n_cores <- min(detectCores() - 1, 8)
    }
    n_cores <- max(1, n_cores)
    
    message("Platform: Unix - using mclapply (forking) with ", n_cores, " cores")
    return(list(
      type = "fork",
      n_cores = n_cores,
      apply_fn = function(X, FUN) {
        mclapply(X, FUN, mc.cores = n_cores)
      },
      cleanup_fn = function() invisible(NULL)
    ))
  } else {
    # Windows: forking not available
    if (PARALLEL_WINDOWS_MODE == "psock") {
      # Use Windows-specific core setting (default 4) to balance speed vs memory
      if (is.null(n_cores)) {
        n_cores <- PARALLEL_WINDOWS_CORES
      }
      n_cores <- max(1, min(n_cores, detectCores() - 1))
      
      message("Platform: Windows - using PSOCK cluster with ", n_cores, " workers")
      message("Note: PSOCK uses ~300-500MB per worker. Adjust with options(parallel.windows.cores = N)")
      
      # Create cluster
      cl <- makeCluster(n_cores, type = "PSOCK")
      
      return(list(
        type = "psock",
        n_cores = n_cores,
        cluster = cl,
        apply_fn = function(X, FUN) {
          parLapply(cl, X, FUN)
        },
        cleanup_fn = function() {
          stopCluster(cl)
          message("PSOCK cluster stopped")
        }
      ))
    } else {
      message("Platform: Windows - using sequential processing (memory-efficient)")
      message("To use parallel processing on Windows, set: options(parallel.windows.mode = 'psock')")
      message("Warning: Sequential processing will be slower but uses less memory")
      
      return(list(
        type = "sequential",
        n_cores = 1,
        apply_fn = function(X, FUN) {
          lapply(X, FUN)
        },
        cleanup_fn = function() invisible(NULL)
      ))
    }
  }
}

#' Export objects to PSOCK cluster workers
#' Only needed for PSOCK clusters; no-op for fork/sequential
#' @param backend Backend object from setup_parallel_backend()
#' @param varlist Character vector of variable names to export
#' @param envir Environment containing the variables
export_to_workers <- function(backend, varlist, envir = parent.frame()) {
  if (backend$type == "psock") {
    clusterExport(backend$cluster, varlist, envir = envir)
    # Load required packages on workers
    clusterEvalQ(backend$cluster, {
      suppressPackageStartupMessages({
        library(sf)
        library(dplyr)
      })
    })
    message("Exported ", length(varlist), " objects to PSOCK workers")
  }
  invisible(NULL)
}

# Load data ----
# Survey data
g <- readRDS(here("data", "inter", "survey_visibility_processed_with_geo.rds"))
if (st_crs(g) != crs.proj) {
  g <- st_transform(g, crs = crs.proj)
}

# Population density (from cache directory)
popd <- read_csv(get_popdensity_cache_path(), show_col_types = FALSE)

g <- left_join(g, popd, by = "response_id")

# Get green energy projects
eia <- readRDS(here("data", "inter", "eia_survey_processed.rds"))
eia <- st_as_sf(eia, coords = c("lon", "lat"), crs = coord.crs)
eia <- st_transform(eia, crs = crs.proj)
eia <- eia %>%
  rename(status_eia = status)

# Get supply chain projects
bgm <- read_csv(here("data", "inter", "turner_processed.csv"), show_col_types = FALSE)
bgm <- bgm %>%
  filter(!is.na(date)) %>%
  filter(!is.na(longitude) & !is.na(latitude)) %>%
  filter(!operating_status %in% c("Cancelled", "Closed", "Rumored")) %>%
  filter(year(date) <= project_year_cutoff & date >= as.Date("2022-08-16"))
bgm <- st_as_sf(bgm, coords = c("longitude", "latitude"), crs = coord.crs)
bgm <- st_transform(bgm, crs = crs.proj)
bgm <- bgm %>%
  rename(status_bgm = operating_status)

# Annotations
annotations <- read_csv(here("data", "inter", "annotated_statements.csv"), show_col_types = FALSE)

# Statement meta-data
statements <- read_csv(here("data", "inter", "statements_processed.csv"), show_col_types = FALSE)

annotated.statements <- left_join(statements, annotations, by = "statement_id")

annotated.statements <- annotated.statements %>%
  mutate(
    gives_statement = case_when(
      actor == "company_text" & company_text_category_1 != "None found" ~ 1, 
      actor == "biden_text" & biden_text_category_1 != "None found" ~ 1,
      actor == "senator_1_text" & senator_1_text_category_1 != "None found" ~ 1,
      actor == "senator_2_text" & senator_2_text_category_1 != "None found" ~ 1,
      actor == "representative_text" & representative_text_category_1 != "None found" ~ 1,
      actor == "governor_text" & governor_text_category_1 != "None found" ~ 1,
      TRUE ~ 0
    )
  )
summary(annotated.statements$gives_statement)
with(annotated.statements, prop.table(table(actor, gives_statement), 1))

annotated.statements <- subset(annotated.statements, select = c(id, actor, gives_credit:gives_statement))
# Calculate credit at project-level
annotated.statements <- annotated.statements %>%
  mutate(across(gives_credit:gives_statement, ~ replace_na(.x, 0))) %>%
  mutate(actor = gsub("_text", "", actor),
         actor = case_when(grepl("senator", actor) ~ "senator", actor == "representative" ~ "rep", actor == "governor" ~ "gov", TRUE ~ actor)) %>%
  group_by(id, actor) %>%
  summarize(across(gives_credit:gives_statement, ~ max(.x, na.rm = TRUE))) %>%
  pivot_wider(id_cols = id, names_from = actor, values_from = c(gives_credit:gives_statement))

bgm <- left_join(bgm, annotated.statements, by = c("site_id" = "id"))

# Geocoordinates quality check
# Flag energy projects that fall outside of shape boundaries
states <- states(cb = TRUE, year = 2021)
states <- st_transform(states, crs = crs.proj)

# Check BGM projects
bgm <- st_join(bgm, states, left = TRUE)
bgm$points_within <- as.integer(!is.na(bgm$NAME))
message(paste0("BGM projects within US (%): ", round(sum(bgm$points_within) / nrow(bgm) * 100, 2)))
bgm <- filter(bgm, points_within == 1)

# Check EIA projects
eia <- st_join(eia, states, left = TRUE)
eia$points_within <- as.integer(!is.na(eia$NAME))
message(paste0("EIA projects within US (%): ", round(sum(eia$points_within) / nrow(eia) * 100, 2)))
# Note: Keep because these are offshore wind turbines

# Calculate distance to projects ----

#' Calculate minimum distance between two spatial datasets using Albers projection
#'
#' Computes the distance between all points in two spatial datasets
#' and returns the minimum distance in kilometers. Uses the Albers Equal Area
#' projection for accurate distance calculations across the continental US.
#'
#' @param df1 Spatial data frame (sf object) containing origin points
#' @param df2 Spatial data frame (sf object) containing destination points
#'
#' @return Numeric value representing minimum distance in kilometers.
#'   Returns Inf if either dataset is empty.
#'
#' @details Both datasets should already be in the same projected coordinate system
#'   (Albers Equal Area). Does not use "Great Circle" parameter as this is meant
#'   for geographic coordinates, not projected coordinates.
#'
#' @examples
#' \dontrun{
#' # Calculate distance from respondent to nearest solar project
#' solar_projects <- subset(eia_data, technology == "Solar Photovoltaic")
#' min_dist <- get_distance(respondent_location, solar_projects)
#' }
get_distance <- function(df1, df2) {
  if (nrow(df2) == 0) return(Inf)
  
  # Calculate distance using projected coordinates (no "Great Circle" parameter)
  df1.distance <- st_distance(df1, df2)
  df1.distance.min <- as.numeric(min(df1.distance) / 1000)
  return(df1.distance.min)
}

#' Calculate distances from survey respondents to green energy projects
#'
#' @param survey.data sf object containing survey respondent data with geometry
#' @param project.data Data frame containing project data
#' @param var.name String specifying the name of the variable to store the distance
#'   in the output. Default: "d_proj"
#' @param days Numeric value specifying time window (in days) before survey date
#'   to consider for project inclusion. Default: 365 (one year)
#' @return Data frame with one row per survey respondent containing distance variables
#'   and timing variables. Includes: distance to closest project, population-weighted distance,
#'   quintiles, km categories, days_before (days between project announcement and survey date),
#'   and annotation data with variable-specific column names to avoid conflicts.
calc_dist2proj <- function(survey.data, project.data, var.name = "d_proj", days = 365) {
  
  message("Starting distance calculation...")
  message("Survey respondents: ", nrow(survey.data))
  message("Projects: ", nrow(project.data))
  message("Time window: ", days, " days before survey date") 
  
  # Ensure data is in correct CRS
  if (st_crs(survey.data) != crs.proj) {
    survey.data <- st_transform(survey.data, crs = crs.proj)
  }
  if (st_crs(project.data) != crs.proj) {
    project.data <- st_transform(project.data, crs = crs.proj)
  }
  message("CRS Aligned: ", st_crs(survey.data))
  
  # Get all possible annotation columns once at the beginning
  annotation.cols <- names(project.data)[grepl("^(gives_credit|credit_|mfg_activity|target_jobs|current_jobs|invest_m|cap|above_|status|sector|technology|gives_statement)", names(project.data))]
  message("Found annotation columns: ", paste(annotation.cols, collapse = ", "))
  
  # Check if project.data is empty
  if (nrow(project.data) == 0) {
    stop("No projects in project.data. Cannot calculate distances.")
  }
  
  # Setup parallel processing - cross-platform
  n.resp <- nrow(survey.data)
  backend <- setup_parallel_backend()
  
  # Ensure cleanup on exit
  on.exit(backend$cleanup_fn(), add = TRUE)
  
  # For PSOCK clusters, we need to export required objects to workers
  # Create local copies of variables needed in the worker function
  local_survey_data <- survey.data
  local_project_data <- project.data
  local_days <- days
  local_var_name <- var.name
  local_annotation_cols <- annotation.cols
  local_crs_proj <- crs.proj
  
  if (backend$type == "psock") {
    export_to_workers(backend, 
                      c("local_survey_data", "local_project_data", "local_days", 
                        "local_var_name", "local_annotation_cols", "local_crs_proj"),
                      envir = environment())
  }
  
  # Define the worker function
  worker_fn <- function(i) {
    tryCatch({
      # Use local copies for PSOCK compatibility
      survey.data <- if (exists("local_survey_data")) local_survey_data else get("survey.data", envir = parent.frame(2))
      project.data <- if (exists("local_project_data")) local_project_data else get("project.data", envir = parent.frame(2))
      days <- if (exists("local_days")) local_days else get("days", envir = parent.frame(2))
      var.name <- if (exists("local_var_name")) local_var_name else get("var.name", envir = parent.frame(2))
      annotation.cols <- if (exists("local_annotation_cols")) local_annotation_cols else get("annotation.cols", envir = parent.frame(2))
      
      if (i %% 500 == 0) {  # Less frequent messages in parallel mode
        message("Processing respondent ", i, " of ", nrow(survey.data), " (", round(i/nrow(survey.data)*100, 1), "%)")
      }
      
      g.sub <- survey.data[i, ]
    
    # Filter projects to only include those announced within the time window before survey
    valid_projects <- project.data
    if ("date" %in% names(project.data) && "start_date" %in% names(g.sub)) {
      survey_date <- g.sub$start_date
      if (!is.na(survey_date)) {
        # Only keep projects announced between (survey_date - days) and survey_date
        cutoff_date <- survey_date - days
        valid_projects <- project.data[
          !is.na(project.data$date) & 
          project.data$date >= cutoff_date & 
          project.data$date <= survey_date, 
        ]
      }
    }
    
    # Find the closest project from valid (time-filtered) projects
    if (nrow(valid_projects) == 0) {
      distances_to_all <- numeric(0)
    } else {
      distances_to_all <- st_distance(g.sub, valid_projects)
    }
    
    # Check if distances were calculated successfully
    if (length(distances_to_all) == 0 || all(is.na(distances_to_all))) {
      closest_project_distance <- NA_real_
      closest_project_idx <- NA_integer_
    } else {
      closest_project_idx <- which.min(distances_to_all)
      closest_project_distance <- as.numeric(min(distances_to_all, na.rm = TRUE) / 1000)  # Convert to km
    }

    # Create base data frame with response_id and distance to closest project
    df_base <- data.frame(response_id = g.sub$response_id)
    df_base[[var.name]] <- closest_project_distance
    
    # Calculate days between closest project announcement and survey date
    if (!is.na(closest_project_idx)) {
      closest_project <- valid_projects[closest_project_idx, ]
      
      # Get project date (handle sf objects)
      if ("date" %in% names(closest_project)) {
        project_date <- closest_project$date
        survey_date <- g.sub$start_date
        
        # Calculate days before survey for the closest project
        # Positive values = closest project announced before survey 
        # Negative values = closest project announced after survey 
        if (!is.na(project_date) && !is.na(survey_date)) {
          days_diff <- as.numeric(survey_date - project_date)
          df_base[[paste0(var.name, "_days_before")]] <- days_diff
        } else {
          df_base[[paste0(var.name, "_days_before")]] <- NA_real_
        }
      } else {
        df_base[[paste0(var.name, "_days_before")]] <- NA_real_
      }
    } else {
      df_base[[paste0(var.name, "_days_before")]] <- NA_real_
    }

    if (length(annotation.cols) > 0) {
      # Get annotation information for closest project
      if (!is.na(closest_project_idx)) {
        closest.project.annotation <- valid_projects[closest_project_idx, annotation.cols, drop = FALSE]
        
        # Add all annotation columns from closest project with variable-specific names
        if (nrow(closest.project.annotation) > 0) {
          for (col in annotation.cols) {
            # Create variable-specific column name to avoid conflicts
            new_col_name <- paste0(col, "_", var.name)
            df_base[[new_col_name]] <- closest.project.annotation[[col]]
          }
        } else {
          # Add NA values if no closest project found
          for (col in annotation.cols) {
            new_col_name <- paste0(col, "_", var.name)
            df_base[[new_col_name]] <- NA_integer_
          }
        }
      } else {
        # Add NA values if no closest project index
        for (col in annotation.cols) {
          new_col_name <- paste0(col, "_", var.name)
          df_base[[new_col_name]] <- NA_integer_
        }
      }
    } 
    
    return(df_base)
    }, error = function(e) {
      # Return error info for debugging - include response_id if available
      resp_id <- tryCatch({
        sd <- if (exists("local_survey_data")) local_survey_data else NULL
        if (!is.null(sd)) sd[i, ]$response_id else NA
      }, error = function(x) NA)
      stop(sprintf("Worker error at index %d (response_id: %s): %s", i, resp_id, e$message))
    })
  }
  
  # Process all respondents using the appropriate backend
  df.out <- backend$apply_fn(seq_len(n.resp), worker_fn)
  
  # Combine all results
  df.return <- do.call(rbind, df.out)
  
  # Calculate composite measures
  message("Creating composite measures...")
  
  # Check if we have any finite distances to create quantiles
  finite_distances <- df.return[[var.name]][is.finite(df.return[[var.name]])]
  
  if (length(finite_distances) > 0) {
    # Create quintiles only if we have finite distances
    q_breaks <- quantile(finite_distances, probs = seq(.2, .8, .2), na.rm = TRUE)
    df.return[[paste0(var.name, "_q")]] <- cut(df.return[[var.name]], c(-Inf, q_breaks, Inf), include.lowest = TRUE, labels = c("Q1", "Q2", "Q3", "Q4", "Q5"))
  } else {
    # If no finite distances, create a single category
    message("Warning: No finite distances found. Creating single category.")
    df.return[[paste0(var.name, "_q")]] <- factor(rep("No_Projects", nrow(df.return)))
  }
  
  # Only create km categories if we have finite distances
  if (length(finite_distances) > 0) {
  df.return[[paste0(var.name, "_km")]] <- cut(df.return[[var.name]], c(0, 15, 25, 50, 100, Inf), include.lowest = TRUE, labels = c("0-15", "15-25", "25-50", "50-100", "100+"))
  } else {
    # If no finite distances, create a single category
    df.return[[paste0(var.name, "_km")]] <- factor(rep("No_Projects", nrow(df.return)))
  }
  
  # Create quintiles and distance bins
  message("Creating quintiles and distance bins...")
  
  # Add timing diagnostics
  timing_col <- paste0(var.name, "_days_before")
  if (timing_col %in% names(df.return)) {
    finite_timing <- df.return[[timing_col]][is.finite(df.return[[timing_col]])]
    total_projects <- sum(!is.na(df.return[[var.name]]))
    valid_timing <- sum(!is.na(df.return[[timing_col]]))
    
    if (length(finite_timing) > 0) {
      n_positive <- sum(finite_timing >= 0)
      n_negative <- sum(finite_timing < 0)
      
      message("Timing summary for ", var.name, ":")
      message("  Projects with valid timing: ", valid_timing, "/", total_projects)
      message("  Closest projects announced before survey: ", n_positive)
      message("  Closest projects announced after survey: ", n_negative)
      message("  Mean days before survey: ", round(mean(finite_timing, na.rm = TRUE), 1))
      message("  Median days before survey: ", round(median(finite_timing, na.rm = TRUE), 1))
      message("  Range: ", round(min(finite_timing, na.rm = TRUE), 1), " to ", round(max(finite_timing, na.rm = TRUE), 1), " days")
      if (n_negative > 0) {
        message("  [WARN] Found ", n_negative, " negative values - this indicates a filtering error!")
      } else {
        message("  [OK] All projects properly filtered to be before survey date")
      }
    } else {
      message("Warning: No valid timing data found for ", var.name)
    }
  }
  
  # Only relevel if the reference categories exist
  q_col <- df.return[[paste0(var.name, "_q")]]
  if (!is.null(q_col) && "Q5" %in% levels(q_col)) {
    df.return[[paste0(var.name, "_q")]] <- relevel(factor(q_col), ref = "Q5")
  }
  
  km_col <- df.return[[paste0(var.name, "_km")]]
  if (!is.null(km_col) && "100+" %in% levels(km_col)) {
    df.return[[paste0(var.name, "_km")]] <- relevel(factor(km_col), ref = "100+")
  }
  
  message("Distance calculation completed successfully!")
  return(df.return)
}

#' Calculate distances from survey respondents to "placebo" projects announced AFTER survey date
#'
#' @param survey.data sf object containing survey respondent data with geometry
#' @param project.data Data frame containing project data
#' @param days Numeric value specifying time window (in days) after survey date
#'   to consider for project inclusion. Default: 365 (one year)
#' @param var.name String specifying the name of the variable to store the distance
#'   in the output. Default: "d_proj_placebo"
#' @return Data frame with one row per survey respondent containing distance variables
#'   for projects announced AFTER their survey date (placebo test)
calc_dist2proj_placebo <- function(survey.data, project.data, var.name = "d_proj_placebo", days = 365) {
  
  message("Starting PLACEBO distance calculation...")
  message("Survey respondents: ", nrow(survey.data))
  message("Projects: ", nrow(project.data))
  message("Time window: ", days, " days AFTER survey date") 
  
  # Ensure data is in correct CRS
  if (st_crs(survey.data) != crs.proj) {
    survey.data <- st_transform(survey.data, crs = crs.proj)
  }
  if (st_crs(project.data) != crs.proj) {
    project.data <- st_transform(project.data, crs = crs.proj)
  }
  
  # Check if project.data is empty
  if (nrow(project.data) == 0) {
    stop("No projects in project.data. Cannot calculate distances.")
  }
  
  # Setup parallel processing - cross-platform
  n.resp <- nrow(survey.data)
  backend <- setup_parallel_backend()
  
  # Ensure cleanup on exit
  on.exit(backend$cleanup_fn(), add = TRUE)
  
  # For PSOCK clusters, we need to export required objects to workers
  local_survey_data <- survey.data
  local_project_data <- project.data
  local_days <- days
  local_var_name <- var.name
  
  if (backend$type == "psock") {
    export_to_workers(backend, 
                      c("local_survey_data", "local_project_data", "local_days", "local_var_name"),
                      envir = environment())
  }
  
  # Define the worker function
  worker_fn_placebo <- function(i) {
    tryCatch({
      # Use local copies for PSOCK compatibility
      survey.data <- if (exists("local_survey_data")) local_survey_data else get("survey.data", envir = parent.frame(2))
      project.data <- if (exists("local_project_data")) local_project_data else get("project.data", envir = parent.frame(2))
      days <- if (exists("local_days")) local_days else get("days", envir = parent.frame(2))
      var.name <- if (exists("local_var_name")) local_var_name else get("var.name", envir = parent.frame(2))
      
      if (i %% 500 == 0) {
        message("Processing placebo respondent ", i, " of ", nrow(survey.data), " (", round(i/nrow(survey.data)*100, 1), "%)")
      }
      
      g.sub <- survey.data[i, ]
    
    # Filter projects to only include those announced AFTER survey date
    valid_projects <- project.data
    if ("date" %in% names(project.data) && "start_date" %in% names(g.sub)) {
      survey_date <- g.sub$start_date
      if (!is.na(survey_date)) {
        # Only keep projects announced between survey_date and (survey_date + days)
        cutoff_date <- survey_date + days
        valid_projects <- project.data[
          !is.na(project.data$date) & 
          project.data$date > survey_date & 
          project.data$date <= cutoff_date, 
        ]
      }
    }
    
    # Find the closest project from valid (time-filtered) projects
    if (nrow(valid_projects) == 0) {
      distances_to_all <- numeric(0)
    } else {
      distances_to_all <- st_distance(g.sub, valid_projects)
    }
    
    # Check if distances were calculated successfully
    if (length(distances_to_all) == 0 || all(is.na(distances_to_all))) {
      closest_project_distance <- NA_real_
      closest_project_idx <- NA_integer_
    } else {
      closest_project_idx <- which.min(distances_to_all)
      closest_project_distance <- as.numeric(min(distances_to_all, na.rm = TRUE) / 1000)
    }

    # Create base data frame with response_id and distance to closest project
    df_base <- data.frame(response_id = g.sub$response_id)
    df_base[[var.name]] <- closest_project_distance
    
    # Calculate days AFTER survey for the closest project
    if (!is.na(closest_project_idx)) {
      closest_project <- valid_projects[closest_project_idx, ]
      
      if ("date" %in% names(closest_project)) {
        project_date <- closest_project$date
        survey_date <- g.sub$start_date
        
        # Calculate days after survey for the closest project
        # Positive values = closest project announced after survey 
        if (!is.na(project_date) && !is.na(survey_date)) {
          days_diff <- as.numeric(project_date - survey_date)
          df_base[[paste0(var.name, "_days_after")]] <- days_diff
        } else {
          df_base[[paste0(var.name, "_days_after")]] <- NA_real_
        }
      } else {
        df_base[[paste0(var.name, "_days_after")]] <- NA_real_
      }
    } else {
      df_base[[paste0(var.name, "_days_after")]] <- NA_real_
    }
    
    return(df_base)
    }, error = function(e) {
      # Return error info for debugging - include response_id if available
      resp_id <- tryCatch({
        sd <- if (exists("local_survey_data")) local_survey_data else NULL
        if (!is.null(sd)) sd[i, ]$response_id else NA
      }, error = function(x) NA)
      stop(sprintf("Placebo worker error at index %d (response_id: %s): %s", i, resp_id, e$message))
    })
  }
  
  # Process all respondents using the appropriate backend
  df.out <- backend$apply_fn(seq_len(n.resp), worker_fn_placebo)
  
  # Combine all results
  df.return <- do.call(rbind, df.out)
  
  # Calculate composite measures
  message("Creating composite measures for placebo...")
  
  # Check if we have any finite distances to create quantiles
  finite_distances <- df.return[[var.name]][is.finite(df.return[[var.name]])]
  
  if (length(finite_distances) > 0) {
    # Create quintiles only if we have finite distances
    q_breaks <- quantile(finite_distances, probs = seq(.2, .8, .2), na.rm = TRUE)
    df.return[[paste0(var.name, "_q")]] <- cut(df.return[[var.name]], c(-Inf, q_breaks, Inf), include.lowest = TRUE, labels = c("Q1", "Q2", "Q3", "Q4", "Q5"))
  } else {
    message("Warning: No finite distances found for placebo. Creating single category.")
    df.return[[paste0(var.name, "_q")]] <- factor(rep("No_Projects", nrow(df.return)))
  }
  
  # Create km categories
  if (length(finite_distances) > 0) {
    df.return[[paste0(var.name, "_km")]] <- cut(df.return[[var.name]], c(0, 15, 25, 50, 100, Inf), include.lowest = TRUE, labels = c("0-15", "15-25", "25-50", "50-100", "100+"))
  } else {
    df.return[[paste0(var.name, "_km")]] <- factor(rep("No_Projects", nrow(df.return)))
  }
  
  # Add timing diagnostics
  timing_col <- paste0(var.name, "_days_after")
  if (timing_col %in% names(df.return)) {
    finite_timing <- df.return[[timing_col]][is.finite(df.return[[timing_col]])]
    total_projects <- sum(!is.na(df.return[[var.name]]))
    valid_timing <- sum(!is.na(df.return[[timing_col]]))
    
    if (length(finite_timing) > 0) {
      message("Placebo timing summary for ", var.name, ":")
      message("  Projects with valid timing: ", valid_timing, "/", total_projects)
      message("  Mean days after survey: ", round(mean(finite_timing, na.rm = TRUE), 1))
      message("  Median days after survey: ", round(median(finite_timing, na.rm = TRUE), 1))
      message("  Range: ", round(min(finite_timing, na.rm = TRUE), 1), " to ", round(max(finite_timing, na.rm = TRUE), 1), " days")
    }
  }
  
  # Relevel factors
  q_col <- df.return[[paste0(var.name, "_q")]]
  if (!is.null(q_col) && "Q5" %in% levels(q_col)) {
    df.return[[paste0(var.name, "_q")]] <- relevel(factor(q_col), ref = "Q5")
  }
  
  km_col <- df.return[[paste0(var.name, "_km")]]
  if (!is.null(km_col) && "100+" %in% levels(km_col)) {
    df.return[[paste0(var.name, "_km")]] <- relevel(factor(km_col), ref = "100+")
  }
  
  message("Placebo distance calculation completed successfully!")
  return(df.return)
}

# 1. Create your project dataframes with different filtering criteria
projects_list <- list(
  # EIA projects - add your filtering criteria here
  solar = eia %>% filter(technology == "Solar Photovoltaic"),
  wind = eia %>% filter(technology %in% c("Onshore Wind Turbine", "Offshore Wind Turbine")), 
  battery = eia %>% filter(technology == "Batteries"),
  re = eia %>% filter(technology %in% c("Onshore Wind Turbine", "Offshore Wind Turbine", "Solar Photovoltaic")),
  
  # BGM projects 
  bat_mfg = bgm %>% filter(sector == "Batteries"),
  ev_mfg = bgm %>% filter(sector == "EVs"),
  solar_mfg = bgm %>% filter(sector == "Solar"),
  wind_mfg = bgm %>% filter(sector == "Wind"),

  mfg_any = bgm,

  mfg = bgm %>% filter(construction == 1 | operating == 1),

  # BGM projects based on construction status
  mfg_con = bgm %>% filter(construction == 1),
  mfg_open = bgm %>% filter(operating == 1)
)

# 2. Define corresponding variable names (must match order of projects_list)
var_names <- c("d_solar", "d_wind", "d_battery", "d_re",
               "d_bat_mfg", "d_ev_mfg", "d_solar_mfg", "d_wind_mfg", "d_mfg_any", "d_mfg",
               "d_mfg_con", "d_mfg_open")

# Validate input before calculation
if (length(projects_list) != length(var_names)) {
  stop("Number of projects in projects_list does not match number of variable names in var_names")
}

# 4. Run calculations for all project types
message("Running distance calculations for ", length(projects_list), " project types...")

# Standard 1-year window calculations
results_list <- map2(projects_list, var_names, ~{
  message("Calculating distances for: ", .y, " (365 days window)")
  calc_dist2proj(survey.data = g, project.data = .x, var.name = .y, days = 365)
})

# Additional 2-year window calculations for all project types
message("Running 2-year window calculations...")
results_2y_list <- map2(projects_list, var_names, ~{
  var_name_2y <- paste0(.y, "_2y")
  message("Calculating 2-year distances for: ", var_name_2y)
  calc_dist2proj(survey.data = g, project.data = .x, var.name = var_name_2y, days = 365*2)
})

# 5. Merge all results into master dataframe
message("Merging all distance results...")
distance_master <- reduce(results_list, ~left_join(.x, .y, by = "response_id"))

# Merge 2-year results
message("Merging 2-year distance results...")
distance_master <- reduce(results_2y_list, ~left_join(.x, .y, by = "response_id"), .init = distance_master)

message("Master dataframe created with ", nrow(distance_master), " rows and ", ncol(distance_master), " columns")
message("Distance variables: ", paste(var_names, collapse = ", "))

# Save output to cache directory (persists across runs, used by anonymized mode)
saveRDS(distance_master, get_distance_cache_path())
message("Saved to ", get_distance_cache_path())

} # End of full mode block